package com.taobao.top.link.remoting;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import remoting.protocol.NotSupportedException;
import remoting.protocol.tcp.TcpContentDelimiter;
import remoting.protocol.tcp.TcpOperations;
import remoting.protocol.tcp.TcpTransportHeader;

import com.taobao.top.link.BufferManager;
import com.taobao.top.link.DefaultLoggerFactory;
import com.taobao.top.link.Logger;
import com.taobao.top.link.LoggerFactory;
import com.taobao.top.link.channel.ChannelContext;
import com.taobao.top.link.channel.ChannelException;
import com.taobao.top.link.channel.ChannelSender.SendHandler;
import com.taobao.top.link.channel.SimpleChannelHandler;
import com.taobao.top.link.remoting.protocol.RemotingTcpProtocolHandle;
import com.taobao.top.link.remoting.protocol.RemotingTransportHeader;

public abstract class RemotingServerChannelHandler extends SimpleChannelHandler {
  protected Logger             logger;
  private ExecutorService      threadPool;
  private SerializationFactory serializationFactory = new DefaultSerializationFactory();

  public RemotingServerChannelHandler() {
    this(DefaultLoggerFactory.getDefault());
  }

  public RemotingServerChannelHandler(LoggerFactory loggerFactory) {
    this.setLoggerFactory(loggerFactory);
  }

  public void setThreadPool(ExecutorService threadPool) {
    this.threadPool = threadPool;
  }

  public void setLoggerFactory(LoggerFactory loggerFactory) {
    this.logger = loggerFactory.create(this);
  }

  public void setSerializationFactory(SerializationFactory serializationFactory) {
    this.serializationFactory = serializationFactory;
  }

  public abstract MethodReturn onMethodCall(MethodCall methodCall, MethodCallContext callContext) throws Throwable;

  @SuppressWarnings("unchecked")
  @Override
  public final void onMessage(final ChannelContext context) throws ChannelException, NotSupportedException {
    Object msg = context.getMessage();

    if (msg instanceof ByteBuffer) {
      this.onMessage(context, (ByteBuffer) msg);
      return;
    }

    for (ByteBuffer buffer : (List<ByteBuffer>) msg)
      this.onMessage(context, buffer);
  }

  private void onMessage(final ChannelContext context, ByteBuffer buffer) throws ChannelException, NotSupportedException {
    final RemotingTcpProtocolHandle protocol = new RemotingTcpProtocolHandle(buffer);
    protocol.ReadPreamble();
    protocol.ReadMajorVersion();
    protocol.ReadMinorVersion();
    final short operation = protocol.ReadOperation();
    protocol.ReadContentDelimiter();
    protocol.ReadContentLength();
    final HashMap<String, Object> transportHeaders = protocol.ReadTransportHeaders();
    final MethodCallContext callContext = this.createCallContext(context, transportHeaders);
    final Serializer serializer = this.serializationFactory.get(transportHeaders.get(RemotingTransportHeader.Format));
    Object flag = transportHeaders.get(RemotingTransportHeader.Flag);
    transportHeaders.clear();
    transportHeaders.put(RemotingTransportHeader.Flag, flag);

    // just use netty io-woker thread, count=cpucore
    if (this.threadPool == null) {
      this.internalOnMessage(context, callContext, protocol, operation, transportHeaders, serializer);
      return;
    }

    // dispatch to business workers
    try {
      this.threadPool.execute(new Runnable() {
        @Override
        public void run() {
          try {
            internalOnMessage(context, callContext, protocol, operation, transportHeaders, serializer);
          } catch (ChannelException e) {
            logger.error(e);
          }
        }
      });
    } catch (RejectedExecutionException exception) {
      String statusPhrase = String.format("server threadpool full, threadpool maxsize is: %s", ((ThreadPoolExecutor) this.threadPool).getMaximumPoolSize());
      this.logger.fatal(statusPhrase);
      transportHeaders.put(TcpTransportHeader.StatusCode, 500);
      transportHeaders.put(TcpTransportHeader.StatusPhrase, statusPhrase);
      this.reply(context, transportHeaders, null);
    }
  }

  private void internalOnMessage(ChannelContext context, MethodCallContext callContext, RemotingTcpProtocolHandle protocol, short operation,
      HashMap<String, Object> transportHeaders, Serializer serializer) throws ChannelException {
    // get method return
    MethodCall methodCall = null;
    MethodReturn methodReturn = null;
    try {
      methodCall = serializer.deserializeMethodCall(protocol.ReadContent());
      methodReturn = this.onMethodCall(methodCall, callContext);
    } catch (Throwable e) {
      this.logger.error(e);
      methodReturn = new MethodReturn();
      methodReturn.Exception = e;
    }

    // oneway?
    if (operation == TcpOperations.OneWayRequest)
      return;

    byte[] data = null;
    try {
      data = serializer.serializeMethodReturn(methodReturn);
    } catch (FormatterException e) {
      this.logger.error(e);
      transportHeaders.put(TcpTransportHeader.StatusCode, 400);
      transportHeaders.put(TcpTransportHeader.StatusPhrase, e.getMessage());
    }
    this.reply(context, transportHeaders, data);
  }

  private void reply(ChannelContext context, HashMap<String, Object> transportHeaders, byte[] data) throws ChannelException {
    final ByteBuffer responseBuffer = BufferManager.getBuffer();
    RemotingTcpProtocolHandle handle = new RemotingTcpProtocolHandle(responseBuffer);
    handle.WritePreamble();
    handle.WriteMajorVersion();
    handle.WriteMinorVersion();
    handle.WriteOperation(TcpOperations.Reply);
    handle.WriteContentDelimiter(TcpContentDelimiter.ContentLength);
    handle.WriteContentLength(data != null ? data.length : 0);
    handle.WriteTransportHeaders(transportHeaders);
    if (data != null)
      handle.WriteContent(data);

    responseBuffer.flip();
    context.reply(responseBuffer, new SendHandler() {
      @Override
      public void onSendComplete() {
        BufferManager.returnBuffer(responseBuffer);
      }
    });
  }

  private MethodCallContext createCallContext(ChannelContext channelContext, Map<String, Object> headers) {
    MethodCallContext callContext = new MethodCallContext(channelContext.getSender());
    for (Entry<String, Object> h : headers.entrySet())
      callContext.setCallContext(h.getKey(), h.getValue());
    return callContext;
  }
}
